home *** CD-ROM | disk | FTP | other *** search
/ Enter 2006 September / Enter 09 2006.iso / Internet / SpamExperts Home 1.1 / SpamExperts Home.exe / lib / spamexperts.modules / spamexperts / se_state.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2006-07-14  |  26.8 KB  |  862 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. '''
  5. SpamExperts State engine.
  6. '''
  7. import os
  8. import sys
  9. import time
  10. import email
  11. import types
  12. import errno
  13. import Queue
  14. import socket
  15. import _winreg
  16. import threading
  17. import xmlrpclib
  18. import email.Utils as email
  19. import email.Header as email
  20.  
  21. try:
  22.     import POW
  23. except ImportError:
  24.     POW = None
  25.  
  26. from spambayes.FileCorpus import FileCorpus
  27. from spambayes.storage import SpamTrainer, HamTrainer, Trainer
  28. from spambayes.message import open_storage as open_message_storage
  29. from spambayes.storage import ensureDir, open_storage, NO_TRAINING_FLAG
  30. from spamexperts.Options import options
  31. verbose = options[('globals', 'verbose')]
  32. options[('globals', 'verbose')] = False
  33. from spambayes.scripts.sb_server import close_platform_mutex
  34. options[('globals', 'verbose')] = verbose
  35. del verbose
  36. from spamexperts import se_stats
  37. from spamexperts import resources
  38. from spamexperts import LSPControl
  39. from spamexperts import se_logging
  40. from spamexperts import software_update
  41. from se_config import spamexpertsConfig
  42. from spamexperts import addressclassifier
  43. from spamexperts.fingerprint import fingerprint
  44. from spamexperts.message import SEHeaderMessage
  45. from spamexperts.Options import get_pathname_option
  46. from spamexperts.storage import TransitMessagesZODB
  47. from spamexperts.ProxyClassifier import ProxyClassifier
  48. from spamexperts.OptionsClass import ON, OFF, BLOCK_SPAM
  49. from spamexperts.FileCorpus import CarefulExpiryFileCorpus
  50. from spamexperts.FileCorpus import SEGzipFileMessageFactory
  51. from spamexperts.OptionsClass import BLOCKED, DELAYED, REMOVED
  52. from spamexperts.OptionsClass import IS_HAM, IS_SPAM, IS_UNSURE
  53. from spamexperts.FileCorpus import SEFileMessage, SEFileMessageFactory
  54.  
  55. class State(object):
  56.     
  57.     def __init__(self):
  58.         '''Initialises the State object that holds the state of the app.
  59.         The default settings are read from Options.py and bayescustomize.ini
  60.         and are then overridden by the command-line processing code in the
  61.         __main__ code below.'''
  62.         self.logFile = None
  63.         self.imapLogFile = None
  64.         self.bayes = None
  65.         self.platform_mutex = None
  66.         self.fingerprint = fingerprint.Fingerprint()
  67.         self.fingerprintclient = None
  68.         self.versioncontrol = software_update.VersionControl()
  69.         self.lspcontrol = LSPControl.LSPControl()
  70.         self.proxyListeners = []
  71.         self.proxies = []
  72.         self.open_remote_connections = []
  73.         self.reclassifier = ProxyClassifier()
  74.         ProxyClassifier.state = self
  75.         ProxyClassifier.init()
  76.         self.gzipCache = options[('Storage', 'cache_use_gzip')]
  77.         self.cacheExpiryDays = options[('Storage', 'cache_expiry_days')]
  78.         self.runTestServer = False
  79.  
  80.     
  81.     def init(self):
  82.         if not hasattr(self, 'model_notifier'):
  83.             raise AssertionError, 'The model needs to tell us about the notifier before init().'
  84.         self.checkLSPStatus()
  85.         SP = xmlrpclib.ServerProxy
  86.         zope_user = 'home'
  87.         zope_pass = '11073'
  88.         address = 'http://%s:%s@%s' % (zope_user, zope_pass, options[('fingerprint', 'zopeserver')])
  89.         self.fingerprintclient = SP(address)
  90.         logfile = os.path.join(se_logging.log_dir, '_pop3proxy.log')
  91.         self.logFile = open(logfile, 'wb', 0)
  92.         if options[('globals', 'verbose')]:
  93.             print >>sys.stderr, 'Opened', logfile
  94.         
  95.         logfile = os.path.join(se_logging.log_dir, '_imap4proxy.log')
  96.         self.imapLogFile = open(logfile, 'wb', 0)
  97.         if options[('globals', 'verbose')]:
  98.             print >>sys.stderr, 'Opened', logfile
  99.         
  100.         self.totalSessions = 0
  101.         self.activeSessions = 0
  102.         self.lastBaseMessageName = ''
  103.         self.uniquifier = 2
  104.         self.spam_handling_option = BLOCK_SPAM
  105.         self.status_switch = ON
  106.         self.p2p_switch = OFF
  107.         self.report_switch = ON
  108.         self.lsp_switch = ON
  109.         self.autoupdate_switch = True
  110.         self.update_available = False
  111.         self.createWorkers()
  112.         self.training_queue = Queue.Queue()
  113.         self.training_thread = threading.Thread(target = self.trainQueue)
  114.         self.training_thread.setDaemon(True)
  115.         self.training_thread.start()
  116.         self.fingerprint_queue = Queue.Queue()
  117.         self.fingerprint_thread = threading.Thread(target = self.fingerprintQueue)
  118.         self.fingerprint_thread.setDaemon(True)
  119.         self.fingerprint_thread.start()
  120.         if POW:
  121.             key_fn = os.path.join(resources.application_directory(), options[('globals', 'ssl_keyfile')])
  122.             certificate_fn = os.path.join(resources.application_directory(), options[('globals', 'ssl_certificate_file')])
  123.             self.rsa = POW.pemRead(POW.RSA_PRIVATE_KEY, open(key_fn).read(), 'spamexperts')
  124.             self.x509 = POW.pemRead(POW.X509_CERTIFICATE, open(certificate_fn).read())
  125.         else:
  126.             self.rsa = None
  127.             self.x509 = None
  128.  
  129.     
  130.     def _store_and_close_db(self, db_att):
  131.         if hasattr(self, db_att):
  132.             getattr(self, db_att).store()
  133.             
  134.             try:
  135.                 getattr(self, db_att).close()
  136.             except OSError:
  137.                 pass
  138.             except:
  139.                 None<EXCEPTION MATCH>OSError
  140.             
  141.  
  142.         None<EXCEPTION MATCH>OSError
  143.  
  144.     
  145.     def close(self, callback = None):
  146.         if not callback:
  147.             
  148.             callback = lambda x: x
  149.         
  150.         for proxy in self.proxies:
  151.             if hasattr(proxy, 'current_account') and proxy.current_account is not None and proxy.current_account.endswith('IMAP'):
  152.                 if options[('globals', 'verbose')]:
  153.                     print 'Closing', proxy.current_account
  154.                 
  155.                 proxy.terminated = True
  156.                 proxy.close()
  157.                 continue
  158.         
  159.         callback(60)
  160.         if options[('globals', 'verbose')]:
  161.             print 'Closed all IMAP proxies.'
  162.         
  163.         now = time.time()
  164.         count = 0
  165.         while time.time() - now < 60:
  166.             if not self.proxies:
  167.                 break
  168.             
  169.             callback(count)
  170.             count += 1
  171.             time.sleep(1)
  172.         for proxy in self.proxies:
  173.             if options[('globals', 'verbose')]:
  174.                 print 'Forcing proxy closed',
  175.                 if hasattr(proxy, 'current_account'):
  176.                     print proxy.current_account
  177.                 else:
  178.                     print 
  179.             
  180.             proxy.terminated = True
  181.             proxy.close()
  182.         
  183.         callback(65)
  184.         if options[('globals', 'verbose')]:
  185.             print 'Closed all proxies.'
  186.         
  187.         ProxyClassifier.finish_processing = True
  188.         ProxyClassifier.processing_thread.join()
  189.         callback(70)
  190.         if options[('globals', 'verbose')]:
  191.             print 'Finished processing thread.'
  192.         
  193.         close_platform_mutex(self.platform_mutex)
  194.         self.platform_mutex = None
  195.         callback(80)
  196.         if options[('globals', 'verbose')]:
  197.             print 'Released mutex.'
  198.         
  199.         if hasattr(self, 'training_thread'):
  200.             self.training_thread.join()
  201.         
  202.         callback(90)
  203.         if options[('globals', 'verbose')]:
  204.             print 'Finished training thread.'
  205.         
  206.         if self.bayes is not None:
  207.             if self.bayes.nham != 0 and self.bayes.nspam != 0:
  208.                 state.bayes.store()
  209.             
  210.             
  211.             try:
  212.                 self.bayes.close()
  213.             except OSError:
  214.                 pass
  215.  
  216.             self.bayes = None
  217.         
  218.         for db_att in ('address_classifier', 'blocked_messages', 'delayed_messages', 'delete_messages', 'message_info_database'):
  219.             self._store_and_close_db(db_att)
  220.         
  221.         if options[('globals', 'verbose')]:
  222.             print 'Stored and closed all databases.'
  223.         
  224.         self.spamCorpus = None
  225.         self.hamCorpus = None
  226.         self.unsureCorpus = None
  227.         self.spamTrainer = None
  228.         self.hamTrainer = None
  229.         if options[('globals', 'verbose')]:
  230.             print >>sys.stderr, 'State is closed.'
  231.         
  232.         callback(99)
  233.  
  234.     
  235.     def createWorkers(self):
  236.         '''Using the options that were initialised in __init__ and then
  237.         possibly overridden by the driver code, create the Bayes object,
  238.         the Corpuses, the Trainers and so on.'''
  239.         if options[('globals', 'verbose')] and options[('globals', 'verbose_level')] > 2:
  240.             print >>sys.stderr, 'Setting up application data environment.'
  241.         
  242.         if not hasattr(self, 'DBName'):
  243.             self.DBName = get_pathname_option('Storage', 'persistent_storage_file')
  244.         
  245.         if not hasattr(self, 'useDB'):
  246.             self.useDB = options[('Storage', 'persistent_use_database')]
  247.         
  248.         if options[('globals', 'verbose')] and options[('globals', 'verbose_level')] > 2:
  249.             print >>sys.stderr, 'Got DBName: %s and useDB: %s' % (self.DBName, self.useDB)
  250.         
  251.         self.bayes = open_storage(self.DBName, self.useDB)
  252.         message_info_db_name = get_pathname_option('Storage', 'messageinfo_storage_file')
  253.         if options[('globals', 'verbose')] and options[('globals', 'verbose_level')] > 2:
  254.             print >>sys.stderr, 'messageinfodb name: %s' % (message_info_db_name,)
  255.         
  256.         spamexpertsConfig.msginfoDB = open_message_storage(message_info_db_name, self.useDB)
  257.         self.message_info_database = spamexpertsConfig.msginfoDB
  258.         SEHeaderMessage().message_info_db = spamexpertsConfig.msginfoDB
  259.         fn = get_pathname_option('Storage', 'address_list_storage_file')
  260.         if self.useDB == 'dbm':
  261.             self.address_classifier = addressclassifier.DBDictAddressClassifier(fn)
  262.         elif self.useDB == 'zodb':
  263.             self.address_classifier = addressclassifier.ZODBAddressClassifier(fn)
  264.         else:
  265.             self.address_classifier = addressclassifier.ZODBAddressClassifier(fn)
  266.         bm = get_pathname_option('Storage', 'blocked_messages_file')
  267.         self.blocked_messages = TransitMessagesZODB(bm)
  268.         dm = get_pathname_option('Storage', 'delayed_messages_file')
  269.         self.delayed_messages = TransitMessagesZODB(dm)
  270.         dd = get_pathname_option('Storage', 'delete_messages_file')
  271.         self.delete_messages = TransitMessagesZODB(dd)
  272.         sc = get_pathname_option('Storage', 'spam_cache')
  273.         hc = get_pathname_option('Storage', 'ham_cache')
  274.         uc = get_pathname_option('Storage', 'unsure_cache')
  275.         wc = get_pathname_option('Storage', 'to_be_removed_cache')
  276.         if options[('globals', 'verbose')]:
  277.             print 'Spam cache:', sc
  278.             print 'Ham cache:', hc
  279.             print 'Unsure cache:', uc
  280.             print 'Waiting cache:', wc
  281.         
  282.         map(ensureDir, [
  283.             sc,
  284.             hc,
  285.             uc,
  286.             wc])
  287.         if self.gzipCache:
  288.             factory = SEGzipFileMessageFactory()
  289.         else:
  290.             factory = SEFileMessageFactory()
  291.         age = options[('Storage', 'cache_expiry_days')] * 24 * 60 * 60
  292.         self.spamCorpus = CarefulExpiryFileCorpus(age, self.blocked_messages, factory, sc, '*', cacheSize = 20)
  293.         self.hamCorpus = CarefulExpiryFileCorpus(age, self.delayed_messages, factory, hc, '*', cacheSize = 20)
  294.         self.unsureCorpus = CarefulExpiryFileCorpus(age, self.delayed_messages, factory, uc, '*', cacheSize = 20)
  295.         self.waitingCorpus = FileCorpus(factory, wc, '*', cacheSize = 1)
  296.         self.spamCorpus.removeExpiredMessages()
  297.         self.hamCorpus.removeExpiredMessages()
  298.         self.unsureCorpus.removeExpiredMessages()
  299.         self.spamTrainer = SpamTrainer(self.bayes)
  300.         self.hamTrainer = HamTrainer(self.bayes)
  301.         self.fingerprintSpamTrainer = FingerprintSpamTrainer(self.fingerprint, self.fingerprintclient)
  302.         self.fingerprintHamTrainer = FingerprintHamTrainer(self.fingerprint, self.fingerprintclient)
  303.         self.spamCorpus.addObserver(self.spamTrainer)
  304.         self.spamCorpus.addObserver(self.fingerprintSpamTrainer)
  305.         self.hamCorpus.addObserver(self.hamTrainer)
  306.         self.hamCorpus.addObserver(self.fingerprintHamTrainer)
  307.         self.numSpams = len(self.spamCorpus)
  308.         self.numHams = len(self.hamCorpus)
  309.         self.statistics = se_stats.SEStats(options, self.message_info_database, SEHeaderMessage)
  310.         for key in self.delayed_messages.classifier.db.keys():
  311.             account = self.delayed_messages[key]
  312.             for msg_id in account.keys():
  313.                 if msg_id in self.hamCorpus.keys() and msg_id in self.waitingCorpus.keys() or msg_id in self.unsureCorpus.keys():
  314.                     continue
  315.                 
  316.                 print >>sys.stderr, 'Message', msg_id, "isn't in cache."
  317.                 del account[msg_id]
  318.             
  319.             self.delayed_messages[key] = account
  320.         
  321.         self.delayed_messages.store()
  322.         if options[('globals', 'verbose')]:
  323.             print >>sys.stderr, 'CreateWorkers finished.'
  324.         
  325.  
  326.     
  327.     def getNewMessageName(self):
  328.         messageName = '%10.10d' % long(time.time())
  329.         if messageName == self.lastBaseMessageName:
  330.             messageName = '%s-%d' % (messageName, self.uniquifier)
  331.             self.uniquifier += 1
  332.         else:
  333.             self.lastBaseMessageName = messageName
  334.             self.uniquifier = 2
  335.         return messageName
  336.  
  337.     
  338.     def checkLSPStatus(self):
  339.         '''Check the status of the LSP and set the GUI setting accordingly.
  340.         '''
  341.         actualstatus = self.lspcontrol.isInstalled()
  342.         guistatus = spamexpertsConfig.enable_lsp
  343.         if actualstatus != guistatus:
  344.             spamexpertsConfig.enable_lsp = actualstatus
  345.         
  346.  
  347.     
  348.     def enableLSP(self):
  349.         '''Enable the LSP.'''
  350.         
  351.         try:
  352.             self.lspcontrol.installLSP()
  353.             return True
  354.         except AssertionError:
  355.             return False
  356.  
  357.  
  358.     
  359.     def disableLSP(self):
  360.         '''Disable the LSP.'''
  361.         
  362.         try:
  363.             self.lspcontrol.removeLSP()
  364.             return True
  365.         except AssertionError:
  366.             return False
  367.  
  368.  
  369.     
  370.     def isLSPEnabled(self):
  371.         return self.lspcontrol.isInstalled()
  372.  
  373.     
  374.     class _dummy_msg(object):
  375.         '''Pretends to be a message, but without all the baggage.'''
  376.         
  377.         def __init__(self, key, corpus):
  378.             
  379.             self.key = lambda : key
  380.             self.corpus = corpus
  381.  
  382.         
  383.         def load(self):
  384.             self.msg = self.corpus[self.key()]
  385.  
  386.         
  387.         def __getitem__(self, key):
  388.             return self.msg[key]
  389.  
  390.  
  391.     
  392.     def _storeDate(self, msg_id, corpus):
  393.         msg = corpus[msg_id]
  394.         msg.load()
  395.         new_header = email.Utils.formatdate(time.time(), True)
  396.         msg['X-SpamExperts-Date'] = new_header
  397.         msg.store()
  398.         corpus.cacheMessageHeaders(msg)
  399.         return email.Utils.parsedate_tz(new_header)
  400.  
  401.     
  402.     def _getIndividualHeaders(self, msg_id, corpus):
  403.         headers = corpus.get_headers(self._dummy_msg(msg_id, corpus))
  404.         if not headers['X-SpamExperts-Date'] and headers[spamexpertsConfig.date_header] and headers['Delivery-Date']:
  405.             pass
  406.         d = headers['Date']
  407.         if d is None:
  408.             date = time.time()
  409.             self._storeDate(msg_id, corpus)
  410.         else:
  411.             time_tuple = email.Utils.parsedate_tz(d)
  412.             if not time_tuple:
  413.                 time_tuple = self._storeDate(msg_id, corpus)
  414.             
  415.             
  416.             try:
  417.                 date = email.Utils.mktime_tz(time_tuple)
  418.             except OverflowError:
  419.                 date = 0.0
  420.  
  421.         subject = self.get_header(headers['Subject'])
  422.         from_ = self.get_header(headers['From'])
  423.         to = self.get_header(headers['To'])
  424.         realfrom = from_
  425.         from_ = email.Utils.parseaddr(from_)
  426.         if not from_[0]:
  427.             pass
  428.         from_ = from_[1]
  429.         return (subject, from_, to, realfrom, date)
  430.  
  431.     
  432.     def get_header(header):
  433.         h = []
  434.         for v, charset in email.Header.decode_header(header):
  435.             
  436.             try:
  437.                 if not charset:
  438.                     pass
  439.                 h.append(unicode(v, 'latin-1'))
  440.             continue
  441.             except (LookupError, UnicodeDecodeError):
  442.                 h.append(unicode(v, 'latin-1'))
  443.                 continue
  444.             
  445.  
  446.         
  447.         return ''.join(h)
  448.  
  449.     get_header = staticmethod(get_header)
  450.     
  451.     def _getMessagesHeadersFromCorpus(self, corpus, klass):
  452.         '''Return all message headers from the corpus object in a dict.'''
  453.         messages = { }
  454.         klass = {
  455.             IS_HAM: 'ham',
  456.             IS_SPAM: 'spam',
  457.             IS_UNSURE: 'unsure' }[klass]
  458.         
  459.         try:
  460.             keys = corpus.keys()
  461.         except AttributeError:
  462.             return messages
  463.  
  464.         for i, msg_id in enumerate(keys):
  465.             (subject, from_, to, realfrom, date) = self._getIndividualHeaders(msg_id, corpus)
  466.             messages[i] = (from_, subject, date, None, to, klass, msg_id, realfrom)
  467.         
  468.         return messages
  469.  
  470.     
  471.     def getSpamMessagesHeaders(self):
  472.         '''Return a dict containing all headers for each message from
  473.         the spam corpus.'''
  474.         return self._getMessagesHeadersFromCorpus(self.spamCorpus, IS_SPAM)
  475.  
  476.     
  477.     def getHamMessagesHeaders(self):
  478.         '''Return a dict containing all headers for each message from
  479.         the ham corpus.'''
  480.         return self._getMessagesHeadersFromCorpus(self.hamCorpus, IS_HAM)
  481.  
  482.     
  483.     def getUnsureMessagesHeaders(self):
  484.         '''Return a dict containing all headers for each message from
  485.         the unsure corpus.'''
  486.         return self._getMessagesHeadersFromCorpus(self.unsureCorpus, IS_UNSURE)
  487.  
  488.     
  489.     def _getMissingMessage(self, msgid, corpus):
  490.         body = 'Sorry, SpamExperts was unable to find this message (%s) in its cache.\r\n\r\nPlease restart SpamExperts as soon as is convenient for you.' % (msgid,)
  491.         
  492.         try:
  493.             headers = corpus.headers[msgid]
  494.             headers = [ '%s:%s' % (k, v) for k, v in headers.iteritems() ]
  495.         except KeyError:
  496.             headers = ('Subject: Missing message.', 'From: unknown@invalid.com', 'To: unknown@invalid.com')
  497.  
  498.         message_text = '%s\r\n\r\n%s' % ('\r\n'.join(headers), body)
  499.         msg = email.message_from_string(message_text, _class = SEFileMessage)
  500.         msg.loaded = True
  501.         return msg
  502.  
  503.     
  504.     def getMessage(self, msgid, klass):
  505.         '''Return a tuple of from address, to address, subject, and body of
  506.         the specified message.'''
  507.         corpora = {
  508.             IS_HAM: self.hamCorpus,
  509.             IS_SPAM: self.spamCorpus,
  510.             IS_UNSURE: self.unsureCorpus }
  511.         corpora_to_try = [
  512.             corpora[klass]] + corpora.values()
  513.         for corpus in corpora_to_try:
  514.             
  515.             try:
  516.                 msg = corpus.get(msgid)
  517.             continue
  518.             except IOError:
  519.                 msg = self._getMissingMessage(msgid, corpus)
  520.                 continue
  521.             
  522.  
  523.         else:
  524.             print "Couldn't find", msgid
  525.             msg = self._getMissingMessage(msgid, corpus)
  526.         if msg is None:
  527.             print "Couldn't find", msgid
  528.             msg = self._getMissingMessage(msgid, corpus)
  529.         
  530.         
  531.         try:
  532.             msg.load()
  533.         except IOError:
  534.             msg = self._getMissingMessage(msgid, corpus)
  535.  
  536.         body = msg.as_html()
  537.         if not isinstance(body, types.UnicodeType):
  538.             body = unicode(body, 'latin-1')
  539.         
  540.         (subject, from_, to, unused, unused) = self._getIndividualHeaders(msgid, corpus)
  541.         return (from_, to, subject, body)
  542.  
  543.     
  544.     def removeMessageFromCorpus(self, msg_id, klass):
  545.         corpus = {
  546.             IS_HAM: self.hamCorpus,
  547.             IS_SPAM: self.spamCorpus,
  548.             IS_UNSURE: self.unsureCorpus }[klass]
  549.         msg = corpus.get(msg_id)
  550.         if msg is None:
  551.             print >>sys.stderr, "Can't find message to remove", msg_id, klass
  552.             return None
  553.         
  554.         msg.load()
  555.         msg.setId(msg_id)
  556.         if msg.getBlockingState()[1] == DELAYED:
  557.             if options[('globals', 'verbose')]:
  558.                 print >>sys.stderr, "Can't remove message %s, it is still to be delivered.  Moving aside." % (msg_id,)
  559.             
  560.             FileCorpus.removeMessage(corpus, msg, observer_flags = NO_TRAINING_FLAG)
  561.             self.waitingCorpus.addMessage(msg)
  562.             return None
  563.         
  564.         corpus.removeMessage(msg, observer_flags = NO_TRAINING_FLAG)
  565.  
  566.     
  567.     def moveAndTrainMessages(self, msg_ids, sourceClass, destClass):
  568.         '''Move (and appropriately train) a message or list of messages.'''
  569.         self.training_queue.put((msg_ids, sourceClass, destClass))
  570.  
  571.     
  572.     def fingerprintQueue(self):
  573.         '''Background updating of the fingerprint server.  This method
  574.         should be started up in a separate thread at init() and will run
  575.         until close().'''
  576.         while self.platform_mutex is not None:
  577.             
  578.             try:
  579.                 msg = self.fingerprint_queue.get_nowait()
  580.             except Queue.Empty:
  581.                 time.sleep(1)
  582.                 continue
  583.  
  584.             self.fingerprintSpamTrainer.train(msg)
  585.  
  586.     
  587.     def trainQueue(self):
  588.         '''Thread-safe training: train messages as they arrive in the
  589.         training queue.  This method should be started up in a separate
  590.         thread at init() and will run until close().'''
  591.         while self.platform_mutex is not None:
  592.             
  593.             try:
  594.                 via_moving = False
  595.                 training_item = self.training_queue.get_nowait()
  596.                 if len(training_item) == 3:
  597.                     (msg_ids, sourceClass, destClass) = training_item
  598.                     via_moving = True
  599.                 elif len(training_item) == 2:
  600.                     (msg, isSpam) = training_item
  601.                 else:
  602.                     print >>sys.stderr, 'Incorrect training data', training_item
  603.             except Queue.Empty:
  604.                 time.sleep(1)
  605.                 continue
  606.  
  607.             
  608.             try:
  609.                 if via_moving:
  610.                     self._moveAndTrainMessages(msg_ids, sourceClass, destClass)
  611.                     if sourceClass == IS_HAM:
  612.                         was_spam = options[('Headers', 'header_ham_string')]
  613.                     elif sourceClass == IS_SPAM:
  614.                         was_spam = options[('Headers', 'header_spam_string')]
  615.                     else:
  616.                         was_spam = options[('Headers', 'header_unsure_string')]
  617.                     if destClass == IS_HAM:
  618.                         self.statistics.RecordTraining(True, old_class = was_spam)
  619.                     elif destClass == IS_SPAM:
  620.                         self.statistics.RecordTraining(False, old_class = was_spam)
  621.                     
  622.                 else:
  623.                     self.bayes.learn(msg.tokenize(), isSpam)
  624.                     self.statistics.RecordTraining(not isSpam)
  625.                     msg.RememberTrained(isSpam)
  626.                 self.bayes.store()
  627.             continue
  628.             except (SystemError, SystemExit, KeyboardInterrupt):
  629.                 raise 
  630.                 continue
  631.                 except Exception:
  632.                     e = None
  633.                     print >>sys.stderr, 'Error in moving/training: %s' % (e,)
  634.                     if options[('globals', 'verbose')]:
  635.                         import traceback
  636.                         traceback.print_exc(None, sys.stderr)
  637.                     
  638.                     options[('globals', 'verbose')]
  639.                 
  640.                 None<EXCEPTION MATCH>Exception
  641.             return None
  642.  
  643.  
  644.     
  645.     def _moveAndTrainMessages(self, msg_ids, sourceClass, destClass):
  646.         sourceCorpus = {
  647.             IS_HAM: self.hamCorpus,
  648.             IS_SPAM: self.spamCorpus,
  649.             IS_UNSURE: self.unsureCorpus }[sourceClass]
  650.         destCorpus = {
  651.             IS_HAM: self.hamCorpus,
  652.             IS_SPAM: self.spamCorpus,
  653.             IS_UNSURE: self.unsureCorpus }[destClass]
  654.         if isinstance(msg_ids, types.StringType):
  655.             msg_ids = (msg_ids,)
  656.         
  657.         for msg_id in msg_ids:
  658.             if sourceClass == IS_SPAM:
  659.                 self.numSpams -= 1
  660.             elif sourceClass == IS_HAM:
  661.                 self.numHams -= 1
  662.             
  663.             if destClass == IS_SPAM:
  664.                 self.numSpams += 1
  665.                 asSpam = True
  666.             elif destClass == IS_UNSURE:
  667.                 asSpam = None
  668.             elif destClass == IS_HAM:
  669.                 self.numHams += 1
  670.                 asSpam = False
  671.             
  672.             destCorpus.takeMessage(msg_id, sourceCorpus, fromCache = True)
  673.             msg = destCorpus.get(msg_id)
  674.             if msg is None:
  675.                 return None
  676.             
  677.             msg.RememberTrained(asSpam)
  678.             (score, classification, clues) = self.reclassifier.classify_message(msg, NO_TRAINING_FLAG)
  679.             msg.delHeaders()
  680.             msg.addHeaders(prob = score, clues = clues)
  681.             msg.store()
  682.             destCorpus.cacheMessageHeaders(msg)
  683.             (account, block_state) = msg.getBlockingState()
  684.             new_state = None
  685.             to_db = None
  686.             from_db = None
  687.             if sourceClass == IS_SPAM:
  688.                 if destClass == IS_HAM or destClass == IS_UNSURE:
  689.                     if block_state == BLOCKED or block_state == REMOVED:
  690.                         new_state = DELAYED
  691.                         
  692.                         try:
  693.                             msg_info = self.blocked_messages.get(account)[msg_id]
  694.                         except KeyError:
  695.                             pass
  696.  
  697.                         old = self.blocked_messages.get(account)
  698.                         del old[msg_id]
  699.                         self.blocked_messages[account] = old
  700.                         self.blocked_messages.store()
  701.                         old = self.delayed_messages.get(account)
  702.                         old[msg_id] = msg_info
  703.                         self.delayed_messages[account] = old
  704.                         self.delayed_messages.store()
  705.                         old = self.delete_messages.get(account)
  706.                         
  707.                         try:
  708.                             del old[msg_id]
  709.                         except KeyError:
  710.                             pass
  711.  
  712.                         self.delete_messages[account] = old
  713.                         self.delete_messages.store()
  714.                     elif (sourceClass == IS_HAM or sourceClass == IS_UNSURE) and destClass == IS_SPAM and block_state == DELAYED:
  715.                         new_state = BLOCKED
  716.                         
  717.                         try:
  718.                             msg_info = self.delayed_messages.get(account)[msg_id]
  719.                         except KeyError:
  720.                             msg_info = { }
  721.  
  722.                         old = self.delayed_messages.get(account)
  723.                         del old[msg_id]
  724.                         self.delayed_messages[account] = old
  725.                         self.delayed_messages.store()
  726.                         old = self.blocked_messages.get(account)
  727.                         old[msg_id] = msg_info
  728.                         self.blocked_messages[account] = old
  729.                         self.blocked_messages.store()
  730.                         old = self.delete_messages.get(account)
  731.                         old[msg_id] = msg_info
  732.                         if options[('globals', 'verbose')]:
  733.                             print 'Scheduling deletion of', msg_id
  734.                         
  735.                         self.delete_messages[account] = old
  736.                         self.delete_messages.store()
  737.                     
  738.             if new_state:
  739.                 msg.rememberBlockingState(account, new_state)
  740.                 continue
  741.         
  742.  
  743.  
  744.  
  745. class FingerprintTrainer(Trainer):
  746.     '''Associates a fingerprint server and one or more corpora.
  747.  
  748.     This class is designed to be a corpus observer.'''
  749.     
  750.     def __init__(self, fingerprint, fingerprint_server, is_spam):
  751.         self.fingerprint = fingerprint
  752.         self.fingerprint_server = fingerprint_server
  753.         self.is_spam = is_spam
  754.  
  755.  
  756.  
  757. class FingerprintSpamTrainer(FingerprintTrainer):
  758.     '''Fingerprint trainer for spam.'''
  759.     
  760.     def __init__(self, fingerprint, fingerprint_server):
  761.         FingerprintTrainer.__init__(self, fingerprint, fingerprint_server, True)
  762.  
  763.     
  764.     def _get_fingerprints(self, msg):
  765.         if hasattr(msg, 'load'):
  766.             msg.load()
  767.         
  768.         msg_fingerprints = list(self.fingerprint.get_fingerprint(msg))
  769.         return {
  770.             'fingerprints': msg_fingerprints,
  771.             'user_id': spamexpertsConfig.user_id }
  772.  
  773.     
  774.     def train(self, msg):
  775.         '''Train the fingerprint database with the message.'''
  776.         mapping = self._get_fingerprints(msg)
  777.         old_timeout = socket.getdefaulttimeout()
  778.         
  779.         try:
  780.             socket.setdefaulttimeout(10)
  781.             for unused in xrange(3):
  782.                 
  783.                 try:
  784.                     result = self.fingerprint_server.store(mapping)
  785.                 except socket.error:
  786.                     e = None
  787.                     print >>sys.stderr, 'Timed out updating FP.'
  788.                     time.sleep(0.5)
  789.                     continue
  790.                     except Exception:
  791.                         e = None
  792.                         time.sleep(0.5)
  793.                         continue
  794.                     elif result[0] != True:
  795.                         print >>sys.stderr, 'FP Error', result
  796.                     
  797.  
  798.                 return None
  799.         finally:
  800.             socket.setdefaulttimeout(old_timeout)
  801.  
  802.         print >>sys.stderr, 'Could not update fingerprint server.', str(e)
  803.         if options[('globals', 'verbose')]:
  804.             import traceback
  805.             traceback.print_exc(None, sys.stderr)
  806.         
  807.  
  808.     
  809.     def untrain(self, msg):
  810.         '''Untrain the fingerprint database with the message.'''
  811.         mapping = self._get_fingerprints(msg)
  812.         old_timeout = socket.getdefaulttimeout()
  813.         server = self.fingerprint_server
  814.         
  815.         try:
  816.             socket.setdefaulttimeout(10)
  817.             for unused in xrange(3):
  818.                 
  819.                 try:
  820.                     result = server.dec_report_count(mapping)
  821.                 except socket.error:
  822.                     e = None
  823.                     print >>sys.stderr, 'Timed out updating FP.'
  824.                     time.sleep(0.5)
  825.                     continue
  826.                     except Exception:
  827.                         e = None
  828.                         time.sleep(0.5)
  829.                         continue
  830.                     elif result[0] != True:
  831.                         print >>sys.stderr, 'FP Error', result
  832.                     
  833.  
  834.                 return None
  835.         finally:
  836.             socket.setdefaulttimeout(old_timeout)
  837.  
  838.         print >>sys.stderr, 'Could not update fingerprint server.', str(e)
  839.         if options[('globals', 'verbose')]:
  840.             import traceback
  841.             traceback.print_exc(None, sys.stderr)
  842.         
  843.  
  844.  
  845.  
  846. class FingerprintHamTrainer(FingerprintTrainer):
  847.     '''Fingerprint trainer for ham.'''
  848.     
  849.     def __init__(self, fingerprint, fingerprint_server):
  850.         FingerprintTrainer.__init__(self, fingerprint, fingerprint_server, False)
  851.  
  852.     
  853.     def train(self, unused):
  854.         pass
  855.  
  856.     
  857.     def untrain(self, unused):
  858.         pass
  859.  
  860.  
  861. state = State()
  862.